package com.ssm.listener;

import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;

/**
 * 功能描述: kafka客户端消费，监听模型实现，通过注解 KafkaListener注解，添加监听的topics。
 *如果需要批量监听消息，通过注解中的containerFactory指定监听使用的容器工厂，在容器工厂里面，设置是否启用
 * 批量监听: 例如
 * ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
 *                 new ConcurrentKafkaListenerContainerFactory<>();
 *         factory.setConsumerFactory(consumerFactory());
 *         // 设置监听客户端的并发监听数量，其实也就是客户端监听的线程数
 *         factory.setConcurrency(3);
 *
 *         // 设置批量监听,一次获取多条数据进行处理,每次获取的消息数量  <= max.poll.records
 *         // 如果不设置批量监听，spring的消息监听，将一条一条的获取消息数据
 *         factory.setBatchListener(false);
 * 如果一个工程里面，根据业务需要，有需要一条一条的处理消息，有需要批量处理消息的，那么根据需要，设置多个容器工厂。
 * 最后只需要在直接中，指定使用不同的容器工厂即可
 * @see com.ssm.config.KafkaConfig
 * @author lixiaomeng
 * @date 2018/12/11
 */
public class KafkaConsumeListener {

    /**
     *
     * 功能描述: 一条一条消息的获取
     *
     * @param: [foo]
     * @return: void
     * @author: lixiaomeng
     * @time: 2018/12/11 17:49
     */
    @KafkaListener( id = "spring_one", topics = "springKafka", clientIdPrefix = "one", containerFactory = "kafkaOneListener")
    public void listen1(String foo) {
        System.out.println( Thread.currentThread().getName() + "KafkaConsumeListener== " + foo);
    }

    /**
     *
     * 功能描述: 批量监听数据
     *
     * @param: []
     * @return: void
     * @author: lixiaomeng
     * @time: 2018/12/11 16:25
     */
    @KafkaListener( id = "spring_batch", topics = "springKafka_batch", clientIdPrefix = "batch", containerFactory = "kafkaBatchListener")
    public void listen(List<String> list) {
        System.out.println( Thread.currentThread().getName() +  " == batch_list == " + list.size());
    }
}
